/*-------------------------------------------------------------------------
 *
 * syncrep.c
 *
 * Synchronous replication is new as of PostgreSQL 9.1.
 *
 * If requested, transaction commits wait until their commit LSN are
 * acknowledged by the synchronous standbys.
 *
 * This module contains the code for waiting and release of backends.
 * All code in this module executes on the primary. The core streaming
 * replication transport remains within WALreceiver/WALsender modules.
 *
 * The essence of this design is that it isolates all logic about
 * waiting/releasing onto the primary. The primary defines which standbys
 * it wishes to wait for. The standbys are completely unaware of the
 * durability requirements of transactions on the primary, reducing the
 * complexity of the code and streamlining both standby operations and
 * network bandwidth because there is no requirement to ship
 * per-transaction state information.
 *
 * Replication is either synchronous or not synchronous (async). If it is
 * async, we just fastpath out of here. If it is sync, then we wait for
 * the write, flush or apply location on the standby before releasing
 * the waiting backend. Further complexity in that interaction is
 * expected in later releases.
 *
 * The best performing way to manage the waiting backends is to have a
 * single ordered queue of waiting backends, so that we can avoid
 * searching the through all waiters each time we receive a reply.
 *
 * In 9.5 or before only a single standby could be considered as
 * synchronous. In 9.6 we support a priority-based multiple synchronous
 * standbys. In 10.0 a quorum-based multiple synchronous standbys is also
 * supported. The number of synchronous standbys that transactions
 * must wait for replies from is specified in synchronous_standby_names.
 * This parameter also specifies a list of standby names and the method
 * (FIRST and ANY) to choose synchronous standbys from the listed ones.
 *
 * The method FIRST specifies a priority-based synchronous replication
 * and makes transaction commits wait until their WAL records are
 * replicated to the requested number of synchronous standbys chosen based
 * on their priorities. The standbys whose names appear earlier in the list
 * are given higher priority and will be considered as synchronous.
 * Other standby servers appearing later in this list represent potential
 * synchronous standbys. If any of the current synchronous standbys
 * disconnects for whatever reason, it will be replaced immediately with
 * the next-highest-priority standby.
 *
 * The method ANY specifies a quorum-based synchronous replication
 * and makes transaction commits wait until their WAL records are
 * replicated to at least the requested number of synchronous standbys
 * in the list. All the standbys appearing in the list are considered as
 * candidates for quorum synchronous standbys.
 *
 * If neither FIRST nor ANY is specified, FIRST is used as the method.
 * This is for backward compatibility with 9.6 or before where only a
 * priority-based sync replication was supported.
 *
 * Before the standbys chosen from synchronous_standby_names can
 * become the synchronous standbys they must have caught up with
 * the primary; that may take some time. Once caught up,
 * the standbys which are considered as synchronous at that moment
 * will release waiters from the queue.
 *
 * Portions Copyright (c) 2010-2017, PostgreSQL Global Development Group
 *
 * IDENTIFICATION
 *      src/backend/replication/syncrep.c
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

#include <unistd.h>

#include "access/xact.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/syncrep.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
#include "storage/pmsignal.h"
#include "storage/proc.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/ps_status.h"

/* User-settable parameters for sync rep */
char       *SyncRepStandbyNames;

#define SyncStandbysDefined() \
    (SyncRepStandbyNames != NULL && SyncRepStandbyNames[0] != '\0')

static bool announce_next_takeover = true;

SyncRepConfigData *SyncRepConfig = NULL;
static int    SyncRepWaitMode = SYNC_REP_NO_WAIT;

static void SyncRepQueueInsert(int mode);
static void SyncRepCancelWait(void);
static int    SyncRepWakeQueue(bool all, int mode);

static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr,
                     XLogRecPtr *flushPtr,
                     XLogRecPtr *applyPtr,
                     bool *am_sync);
static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
                           XLogRecPtr *flushPtr,
                           XLogRecPtr *applyPtr,
                           List *sync_standbys);
static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
                              XLogRecPtr *flushPtr,
                              XLogRecPtr *applyPtr,
                              List *sync_standbys, uint8 nth);
static int    SyncRepGetStandbyPriority(void);
static List *SyncRepGetSyncStandbysPriority(bool *am_sync);
static List *SyncRepGetSyncStandbysQuorum(bool *am_sync);
static int    cmp_lsn(const void *a, const void *b);

#ifdef USE_ASSERT_CHECKING
static bool SyncRepQueueIsOrderedByLSN(int mode);
#endif

/*
 * ===========================================================
 * Synchronous Replication functions for normal user backends
 * ===========================================================
 */

/*
 * Wait for synchronous replication, if requested by user.
 *
 * Initially backends start in state SYNC_REP_NOT_WAITING and then
 * change that state to SYNC_REP_WAITING before adding ourselves
 * to the wait queue. During SyncRepWakeQueue() a WALSender changes
 * the state to SYNC_REP_WAIT_COMPLETE once replication is confirmed.
 * This backend then resets its state to SYNC_REP_NOT_WAITING.
 *
 * 'lsn' represents the LSN to wait for.  'commit' indicates whether this LSN
 * represents a commit record.  If it doesn't, then we wait only for the WAL
 * to be flushed if synchronous_commit is set to the higher level of
 * remote_apply, because only commit records provide apply feedback.
 */
void
SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
{// #lizard forgives
    char       *new_status = NULL;
    const char *old_status;
    int            mode;

    /* Cap the level for anything other than commit to remote flush only. */
    if (commit)
        mode = SyncRepWaitMode;
    else
        mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);

    /*
     * Fast exit if user has not requested sync replication, or there are no
     * sync replication standby names defined. Note that those standbys don't
     * need to be connected.
     */
    if (!SyncRepRequested() || !SyncStandbysDefined())
        return;

    Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
    Assert(WalSndCtl != NULL);

    LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
    Assert(MyProc->syncRepState == SYNC_REP_NOT_WAITING);

    /*
     * We don't wait for sync rep if WalSndCtl->sync_standbys_defined is not
     * set.  See SyncRepUpdateSyncStandbysDefined.
     *
     * Also check that the standby hasn't already replied. Unlikely race
     * condition but we'll be fetching that cache line anyway so it's likely
     * to be a low cost check.
     */
    if (!WalSndCtl->sync_standbys_defined ||
        lsn <= WalSndCtl->lsn[mode])
    {
        LWLockRelease(SyncRepLock);
        return;
    }

    /*
     * Set our waitLSN so WALSender will know when to wake us, and add
     * ourselves to the queue.
     */
    MyProc->waitLSN = lsn;
    MyProc->syncRepState = SYNC_REP_WAITING;
    SyncRepQueueInsert(mode);
    Assert(SyncRepQueueIsOrderedByLSN(mode));
    LWLockRelease(SyncRepLock);

    /* Alter ps display to show waiting for sync rep. */
    if (update_process_title)
    {
        int            len;

        old_status = get_ps_display(&len);
        new_status = (char *) palloc(len + 32 + 1);
        memcpy(new_status, old_status, len);
        sprintf(new_status + len, " waiting for %X/%X",
                (uint32) (lsn >> 32), (uint32) lsn);
        set_ps_display(new_status, false);
        new_status[len] = '\0'; /* truncate off " waiting ..." */
    }

    /*
     * Wait for specified LSN to be confirmed.
     *
     * Each proc has its own wait latch, so we perform a normal latch
     * check/wait loop here.
     */
    for (;;)
    {
        /* Must reset the latch before testing state. */
        ResetLatch(MyLatch);

        /*
         * Acquiring the lock is not needed, the latch ensures proper
         * barriers. If it looks like we're done, we must really be done,
         * because once walsender changes the state to SYNC_REP_WAIT_COMPLETE,
         * it will never update it again, so we can't be seeing a stale value
         * in that case.
         */
        if (MyProc->syncRepState == SYNC_REP_WAIT_COMPLETE)
            break;

        /*
         * If a wait for synchronous replication is pending, we can neither
         * acknowledge the commit nor raise ERROR or FATAL.  The latter would
         * lead the client to believe that the transaction aborted, which is
         * not true: it's already committed locally. The former is no good
         * either: the client has requested synchronous replication, and is
         * entitled to assume that an acknowledged commit is also replicated,
         * which might not be true. So in this case we issue a WARNING (which
         * some clients may be able to interpret) and shut off further output.
         * We do NOT reset ProcDiePending, so that the process will die after
         * the commit is cleaned up.
         */
        if (ProcDiePending)
        {
            ereport(WARNING,
                    (errcode(ERRCODE_ADMIN_SHUTDOWN),
                     errmsg("canceling the wait for synchronous replication and terminating connection due to administrator command"),
                     errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
            whereToSendOutput = DestNone;
            SyncRepCancelWait();
            break;
        }

        /*
         * It's unclear what to do if a query cancel interrupt arrives.  We
         * can't actually abort at this point, but ignoring the interrupt
         * altogether is not helpful, so we just terminate the wait with a
         * suitable warning.
         */
        if (QueryCancelPending)
        {
            QueryCancelPending = false;
            ereport(WARNING,
                    (errmsg("canceling wait for synchronous replication due to user request"),
                     errdetail("The transaction has already committed locally, but might not have been replicated to the standby.")));
            SyncRepCancelWait();
            break;
        }

        /*
         * If the postmaster dies, we'll probably never get an
         * acknowledgement, because all the wal sender processes will exit. So
         * just bail out.
         */
        if (!PostmasterIsAlive())
        {
            ProcDiePending = true;
            whereToSendOutput = DestNone;
            SyncRepCancelWait();
            break;
        }

        /*
         * Wait on latch.  Any condition that should wake us up will set the
         * latch, so no need for timeout.
         */
        WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, -1,
                  WAIT_EVENT_SYNC_REP);
    }

    /*
     * WalSender has checked our LSN and has removed us from queue. Clean up
     * state and leave.  It's OK to reset these shared memory fields without
     * holding SyncRepLock, because any walsenders will ignore us anyway when
     * we're not on the queue.  We need a read barrier to make sure we see the
     * changes to the queue link (this might be unnecessary without
     * assertions, but better safe than sorry).
     */
    pg_read_barrier();
#ifdef NOT_USED    
    /*
     * We are hitting this Assert too often in our tests. This is a PG bug and
     * has been reported to the list. But turn this off until it gets fixed in
     * the upstream branch
     */
    Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
#endif    
    MyProc->syncRepState = SYNC_REP_NOT_WAITING;
    MyProc->waitLSN = 0;

    if (new_status)
    {
        /* Reset ps display */
        set_ps_display(new_status, false);
        pfree(new_status);
    }
}

/*
 * Insert MyProc into the specified SyncRepQueue, maintaining sorted invariant.
 *
 * Usually we will go at tail of queue, though it's possible that we arrive
 * here out of order, so start at tail and work back to insertion point.
 */
static void
SyncRepQueueInsert(int mode)
{
    PGPROC       *proc;

    Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
    proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
                                   &(WalSndCtl->SyncRepQueue[mode]),
                                   offsetof(PGPROC, syncRepLinks));

    while (proc)
    {
        /*
         * Stop at the queue element that we should after to ensure the queue
         * is ordered by LSN.
         */
        if (proc->waitLSN < MyProc->waitLSN)
            break;

        proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
                                       &(proc->syncRepLinks),
                                       offsetof(PGPROC, syncRepLinks));
    }

    if (proc)
        SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks));
    else
        SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[mode]), &(MyProc->syncRepLinks));
}

/*
 * Acquire SyncRepLock and cancel any wait currently in progress.
 */
static void
SyncRepCancelWait(void)
{
    LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
    if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
        SHMQueueDelete(&(MyProc->syncRepLinks));
    MyProc->syncRepState = SYNC_REP_NOT_WAITING;
    LWLockRelease(SyncRepLock);
}

void
SyncRepCleanupAtProcExit(void)
{
    if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
    {
        LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
        SHMQueueDelete(&(MyProc->syncRepLinks));
        LWLockRelease(SyncRepLock);
    }
}

/*
 * ===========================================================
 * Synchronous Replication functions for wal sender processes
 * ===========================================================
 */

/*
 * Take any action required to initialise sync rep state from config
 * data. Called at WALSender startup and after each SIGHUP.
 */
void
SyncRepInitConfig(void)
{
    int            priority;

    /*
     * Determine if we are a potential sync standby and remember the result
     * for handling replies from standby.
     */
    priority = SyncRepGetStandbyPriority();
    if (MyWalSnd->sync_standby_priority != priority)
    {
        LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
        MyWalSnd->sync_standby_priority = priority;
        LWLockRelease(SyncRepLock);
        ereport(DEBUG1,
                (errmsg("standby \"%s\" now has synchronous standby priority %u",
                        application_name, priority)));
    }
}

/*
 * Update the LSNs on each queue based upon our latest state. This
 * implements a simple policy of first-valid-sync-standby-releases-waiter.
 *
 * Other policies are possible, which would change what we do here and
 * perhaps also which information we store as well.
 */
void
SyncRepReleaseWaiters(void)
{// #lizard forgives
    volatile WalSndCtlData *walsndctl = WalSndCtl;
    XLogRecPtr    writePtr;
    XLogRecPtr    flushPtr;
    XLogRecPtr    applyPtr;
    bool        got_recptr;
    bool        am_sync;
    int            numwrite = 0;
    int            numflush = 0;
    int            numapply = 0;

    /*
     * If this WALSender is serving a standby that is not on the list of
     * potential sync standbys then we have nothing to do. If we are still
     * starting up, still running base backup or the current flush position is
     * still invalid, then leave quickly also.
     */
    if (MyWalSnd->sync_standby_priority == 0 ||
        MyWalSnd->state < WALSNDSTATE_STREAMING ||
        XLogRecPtrIsInvalid(MyWalSnd->flush))
    {
        announce_next_takeover = true;
        return;
    }

    /*
     * We're a potential sync standby. Release waiters if there are enough
     * sync standbys and we are considered as sync.
     */
    LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);

    /*
     * Check whether we are a sync standby or not, and calculate the synced
     * positions among all sync standbys.
     */
    got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync);

    /*
     * If we are managing a sync standby, though we weren't prior to this,
     * then announce we are now a sync standby.
     */
    if (announce_next_takeover && am_sync)
    {
        announce_next_takeover = false;

        if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
            ereport(LOG,
                    (errmsg("standby \"%s\" is now a synchronous standby with priority %u",
                            application_name, MyWalSnd->sync_standby_priority)));
        else
            ereport(LOG,
                    (errmsg("standby \"%s\" is now a candidate for quorum synchronous standby",
                            application_name)));
    }

    /*
     * If the number of sync standbys is less than requested or we aren't
     * managing a sync standby then just leave.
     */
    if (!got_recptr || !am_sync)
    {
        LWLockRelease(SyncRepLock);
        announce_next_takeover = !am_sync;
        return;
    }

    /*
     * Set the lsn first so that when we wake backends they will release up to
     * this location.
     */
    if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < writePtr)
    {
        walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr;
        numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE);
    }
    if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flushPtr)
    {
        walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr;
        numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH);
    }
    if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < applyPtr)
    {
        walsndctl->lsn[SYNC_REP_WAIT_APPLY] = applyPtr;
        numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY);
    }

    LWLockRelease(SyncRepLock);

    elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X",
         numwrite, (uint32) (writePtr >> 32), (uint32) writePtr,
         numflush, (uint32) (flushPtr >> 32), (uint32) flushPtr,
         numapply, (uint32) (applyPtr >> 32), (uint32) applyPtr);
}

/*
 * Calculate the synced Write, Flush and Apply positions among sync standbys.
 *
 * Return false if the number of sync standbys is less than
 * synchronous_standby_names specifies. Otherwise return true and
 * store the positions into *writePtr, *flushPtr and *applyPtr.
 *
 * On return, *am_sync is set to true if this walsender is connecting to
 * sync standby. Otherwise it's set to false.
 */
static bool
SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
                     XLogRecPtr *applyPtr, bool *am_sync)
{
    List       *sync_standbys;

    *writePtr = InvalidXLogRecPtr;
    *flushPtr = InvalidXLogRecPtr;
    *applyPtr = InvalidXLogRecPtr;
    *am_sync = false;

    /* Get standbys that are considered as synchronous at this moment */
    sync_standbys = SyncRepGetSyncStandbys(am_sync);

    /*
     * Quick exit if we are not managing a sync standby or there are not
     * enough synchronous standbys.
     */
    if (!(*am_sync) ||
        SyncRepConfig == NULL ||
        list_length(sync_standbys) < SyncRepConfig->num_sync)
    {
        list_free(sync_standbys);
        return false;
    }

    /*
     * In a priority-based sync replication, the synced positions are the
     * oldest ones among sync standbys. In a quorum-based, they are the Nth
     * latest ones.
     *
     * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest
     * positions. But we use SyncRepGetOldestSyncRecPtr() for that calculation
     * because it's a bit more efficient.
     *
     * XXX If the numbers of current and requested sync standbys are the same,
     * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced
     * positions even in a quorum-based sync replication.
     */
    if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
    {
        SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
                                   sync_standbys);
    }
    else
    {
        SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
                                      sync_standbys, SyncRepConfig->num_sync);
    }

    list_free(sync_standbys);
    return true;
}

/*
 * Calculate the oldest Write, Flush and Apply positions among sync standbys.
 */
static void
SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
                           XLogRecPtr *applyPtr, List *sync_standbys)
{
    ListCell   *cell;

    /*
     * Scan through all sync standbys and calculate the oldest Write, Flush
     * and Apply positions.
     */
    foreach(cell, sync_standbys)
    {
        WalSnd       *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
        XLogRecPtr    write;
        XLogRecPtr    flush;
        XLogRecPtr    apply;

        SpinLockAcquire(&walsnd->mutex);
        write = walsnd->write;
        flush = walsnd->flush;
        apply = walsnd->apply;
        SpinLockRelease(&walsnd->mutex);

        if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
            *writePtr = write;
        if (XLogRecPtrIsInvalid(*flushPtr) || *flushPtr > flush)
            *flushPtr = flush;
        if (XLogRecPtrIsInvalid(*applyPtr) || *applyPtr > apply)
            *applyPtr = apply;
    }
}

/*
 * Calculate the Nth latest Write, Flush and Apply positions among sync
 * standbys.
 */
static void
SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
                              XLogRecPtr *applyPtr, List *sync_standbys, uint8 nth)
{
    ListCell   *cell;
    XLogRecPtr *write_array;
    XLogRecPtr *flush_array;
    XLogRecPtr *apply_array;
    int            len;
    int            i = 0;

    len = list_length(sync_standbys);
    write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
    flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
    apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);

    foreach(cell, sync_standbys)
    {
        WalSnd       *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];

        SpinLockAcquire(&walsnd->mutex);
        write_array[i] = walsnd->write;
        flush_array[i] = walsnd->flush;
        apply_array[i] = walsnd->apply;
        SpinLockRelease(&walsnd->mutex);

        i++;
    }

    /* Sort each array in descending order */
    qsort(write_array, len, sizeof(XLogRecPtr), cmp_lsn);
    qsort(flush_array, len, sizeof(XLogRecPtr), cmp_lsn);
    qsort(apply_array, len, sizeof(XLogRecPtr), cmp_lsn);

    /* Get Nth latest Write, Flush, Apply positions */
    *writePtr = write_array[nth - 1];
    *flushPtr = flush_array[nth - 1];
    *applyPtr = apply_array[nth - 1];

    pfree(write_array);
    pfree(flush_array);
    pfree(apply_array);
}

/*
 * Compare lsn in order to sort array in descending order.
 */
static int
cmp_lsn(const void *a, const void *b)
{
    XLogRecPtr    lsn1 = *((const XLogRecPtr *) a);
    XLogRecPtr    lsn2 = *((const XLogRecPtr *) b);

    if (lsn1 > lsn2)
        return -1;
    else if (lsn1 == lsn2)
        return 0;
    else
        return 1;
}

/*
 * Return the list of sync standbys, or NIL if no sync standby is connected.
 *
 * The caller must hold SyncRepLock.
 *
 * On return, *am_sync is set to true if this walsender is connecting to
 * sync standby. Otherwise it's set to false.
 */
List *
SyncRepGetSyncStandbys(bool *am_sync)
{
    /* Set default result */
    if (am_sync != NULL)
        *am_sync = false;

    /* Quick exit if sync replication is not requested */
    if (SyncRepConfig == NULL)
        return NIL;

    return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ?
        SyncRepGetSyncStandbysPriority(am_sync) :
        SyncRepGetSyncStandbysQuorum(am_sync);
}

/*
 * Return the list of all the candidates for quorum sync standbys,
 * or NIL if no such standby is connected.
 *
 * The caller must hold SyncRepLock. This function must be called only in
 * a quorum-based sync replication.
 *
 * On return, *am_sync is set to true if this walsender is connecting to
 * sync standby. Otherwise it's set to false.
 */
static List *
SyncRepGetSyncStandbysQuorum(bool *am_sync)
{
    List       *result = NIL;
    int            i;
    volatile WalSnd *walsnd;    /* Use volatile pointer to prevent code
                                 * rearrangement */

    Assert(SyncRepConfig->syncrep_method == SYNC_REP_QUORUM);

    for (i = 0; i < max_wal_senders; i++)
    {
        XLogRecPtr    flush;
        WalSndState state;
        int            pid;

        walsnd = &WalSndCtl->walsnds[i];

        SpinLockAcquire(&walsnd->mutex);
        pid = walsnd->pid;
        flush = walsnd->flush;
        state = walsnd->state;
        SpinLockRelease(&walsnd->mutex);

        /* Must be active */
        if (pid == 0)
            continue;

        /* Must be streaming */
        if (state != WALSNDSTATE_STREAMING)
            continue;

        /* Must be synchronous */
        if (walsnd->sync_standby_priority == 0)
            continue;

        /* Must have a valid flush position */
        if (XLogRecPtrIsInvalid(flush))
            continue;

        /*
         * Consider this standby as a candidate for quorum sync standbys and
         * append it to the result.
         */
        result = lappend_int(result, i);
        if (am_sync != NULL && walsnd == MyWalSnd)
            *am_sync = true;
    }

    return result;
}

/*
 * Return the list of sync standbys chosen based on their priorities,
 * or NIL if no sync standby is connected.
 *
 * If there are multiple standbys with the same priority,
 * the first one found is selected preferentially.
 *
 * The caller must hold SyncRepLock. This function must be called only in
 * a priority-based sync replication.
 *
 * On return, *am_sync is set to true if this walsender is connecting to
 * sync standby. Otherwise it's set to false.
 */
static List *
SyncRepGetSyncStandbysPriority(bool *am_sync)
{// #lizard forgives
    List       *result = NIL;
    List       *pending = NIL;
    int            lowest_priority;
    int            next_highest_priority;
    int            this_priority;
    int            priority;
    int            i;
    bool        am_in_pending = false;
    volatile WalSnd *walsnd;    /* Use volatile pointer to prevent code
                                 * rearrangement */

    Assert(SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY);

    lowest_priority = SyncRepConfig->nmembers;
    next_highest_priority = lowest_priority + 1;

    /*
     * Find the sync standbys which have the highest priority (i.e, 1). Also
     * store all the other potential sync standbys into the pending list, in
     * order to scan it later and find other sync standbys from it quickly.
     */
    for (i = 0; i < max_wal_senders; i++)
    {
        XLogRecPtr    flush;
        WalSndState state;
        int            pid;

        walsnd = &WalSndCtl->walsnds[i];

        SpinLockAcquire(&walsnd->mutex);
        pid = walsnd->pid;
        flush = walsnd->flush;
        state = walsnd->state;
        SpinLockRelease(&walsnd->mutex);

        /* Must be active */
        if (pid == 0)
            continue;

        /* Must be streaming */
        if (state != WALSNDSTATE_STREAMING)
            continue;

        /* Must be synchronous */
        this_priority = walsnd->sync_standby_priority;
        if (this_priority == 0)
            continue;

        /* Must have a valid flush position */
        if (XLogRecPtrIsInvalid(flush))
            continue;

        /*
         * If the priority is equal to 1, consider this standby as sync and
         * append it to the result. Otherwise append this standby to the
         * pending list to check if it's actually sync or not later.
         */
        if (this_priority == 1)
        {
            result = lappend_int(result, i);
            if (am_sync != NULL && walsnd == MyWalSnd)
                *am_sync = true;
            if (list_length(result) == SyncRepConfig->num_sync)
            {
                list_free(pending);
                return result;    /* Exit if got enough sync standbys */
            }
        }
        else
        {
            pending = lappend_int(pending, i);
            if (am_sync != NULL && walsnd == MyWalSnd)
                am_in_pending = true;

            /*
             * Track the highest priority among the standbys in the pending
             * list, in order to use it as the starting priority for later
             * scan of the list. This is useful to find quickly the sync
             * standbys from the pending list later because we can skip
             * unnecessary scans for the unused priorities.
             */
            if (this_priority < next_highest_priority)
                next_highest_priority = this_priority;
        }
    }

    /*
     * Consider all pending standbys as sync if the number of them plus
     * already-found sync ones is lower than the configuration requests.
     */
    if (list_length(result) + list_length(pending) <= SyncRepConfig->num_sync)
    {
        bool        needfree = (result != NIL && pending != NIL);

        /*
         * Set *am_sync to true if this walsender is in the pending list
         * because all pending standbys are considered as sync.
         */
        if (am_sync != NULL && !(*am_sync))
            *am_sync = am_in_pending;

        result = list_concat(result, pending);
        if (needfree)
            pfree(pending);
        return result;
    }

    /*
     * Find the sync standbys from the pending list.
     */
    priority = next_highest_priority;
    while (priority <= lowest_priority)
    {
        ListCell   *cell;
        ListCell   *prev = NULL;
        ListCell   *next;

        next_highest_priority = lowest_priority + 1;

        for (cell = list_head(pending); cell != NULL; cell = next)
        {
            i = lfirst_int(cell);
            walsnd = &WalSndCtl->walsnds[i];

            next = lnext(cell);

            this_priority = walsnd->sync_standby_priority;
            if (this_priority == priority)
            {
                result = lappend_int(result, i);
                if (am_sync != NULL && walsnd == MyWalSnd)
                    *am_sync = true;

                /*
                 * We should always exit here after the scan of pending list
                 * starts because we know that the list has enough elements to
                 * reach SyncRepConfig->num_sync.
                 */
                if (list_length(result) == SyncRepConfig->num_sync)
                {
                    list_free(pending);
                    return result;    /* Exit if got enough sync standbys */
                }

                /*
                 * Remove the entry for this sync standby from the list to
                 * prevent us from looking at the same entry again.
                 */
                pending = list_delete_cell(pending, cell, prev);

                continue;
            }

            if (this_priority < next_highest_priority)
                next_highest_priority = this_priority;

            prev = cell;
        }

        priority = next_highest_priority;
    }

    /* never reached, but keep compiler quiet */
    Assert(false);
    return result;
}

/*
 * Check if we are in the list of sync standbys, and if so, determine
 * priority sequence. Return priority if set, or zero to indicate that
 * we are not a potential sync standby.
 *
 * Compare the parameter SyncRepStandbyNames against the application_name
 * for this WALSender, or allow any name if we find a wildcard "*".
 */
static int
SyncRepGetStandbyPriority(void)
{// #lizard forgives
    const char *standby_name;
    int            priority;
    bool        found = false;

    /*
     * Since synchronous cascade replication is not allowed, we always set the
     * priority of cascading walsender to zero.
     */
    if (am_cascading_walsender)
        return 0;

    if (!SyncStandbysDefined() || SyncRepConfig == NULL)
        return 0;

    standby_name = SyncRepConfig->member_names;
    for (priority = 1; priority <= SyncRepConfig->nmembers; priority++)
    {
        if (pg_strcasecmp(standby_name, application_name) == 0 ||
            strcmp(standby_name, "*") == 0)
        {
            found = true;
            break;
        }
        standby_name += strlen(standby_name) + 1;
    }

    if (!found)
        return 0;

    /*
     * In quorum-based sync replication, all the standbys in the list have the
     * same priority, one.
     */
    return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? priority : 1;
}

/*
 * Walk the specified queue from head.  Set the state of any backends that
 * need to be woken, remove them from the queue, and then wake them.
 * Pass all = true to wake whole queue; otherwise, just wake up to
 * the walsender's LSN.
 *
 * Must hold SyncRepLock.
 */
static int
SyncRepWakeQueue(bool all, int mode)
{
    volatile WalSndCtlData *walsndctl = WalSndCtl;
    PGPROC       *proc = NULL;
    PGPROC       *thisproc = NULL;
    int            numprocs = 0;

    Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
    Assert(SyncRepQueueIsOrderedByLSN(mode));

    proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
                                   &(WalSndCtl->SyncRepQueue[mode]),
                                   offsetof(PGPROC, syncRepLinks));

    while (proc)
    {
        /*
         * Assume the queue is ordered by LSN
         */
        if (!all && walsndctl->lsn[mode] < proc->waitLSN)
            return numprocs;

        /*
         * Move to next proc, so we can delete thisproc from the queue.
         * thisproc is valid, proc may be NULL after this.
         */
        thisproc = proc;
        proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
                                       &(proc->syncRepLinks),
                                       offsetof(PGPROC, syncRepLinks));

        /*
         * Remove thisproc from queue.
         */
        SHMQueueDelete(&(thisproc->syncRepLinks));

        /*
         * SyncRepWaitForLSN() reads syncRepState without holding the lock, so
         * make sure that it sees the queue link being removed before the
         * syncRepState change.
         */
        pg_write_barrier();

        /*
         * Set state to complete; see SyncRepWaitForLSN() for discussion of
         * the various states.
         */
        thisproc->syncRepState = SYNC_REP_WAIT_COMPLETE;

        /*
         * Wake only when we have set state and removed from queue.
         */
        SetLatch(&(thisproc->procLatch));

        numprocs++;
    }

    return numprocs;
}

/*
 * The checkpointer calls this as needed to update the shared
 * sync_standbys_defined flag, so that backends don't remain permanently wedged
 * if synchronous_standby_names is unset.  It's safe to check the current value
 * without the lock, because it's only ever updated by one process.  But we
 * must take the lock to change it.
 */
void
SyncRepUpdateSyncStandbysDefined(void)
{
    bool        sync_standbys_defined = SyncStandbysDefined();

    if (sync_standbys_defined != WalSndCtl->sync_standbys_defined)
    {
        LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);

        /*
         * If synchronous_standby_names has been reset to empty, it's futile
         * for backends to continue to waiting.  Since the user no longer
         * wants synchronous replication, we'd better wake them up.
         */
        if (!sync_standbys_defined)
        {
            int            i;

            for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
                SyncRepWakeQueue(true, i);
        }

        /*
         * Only allow people to join the queue when there are synchronous
         * standbys defined.  Without this interlock, there's a race
         * condition: we might wake up all the current waiters; then, some
         * backend that hasn't yet reloaded its config might go to sleep on
         * the queue (and never wake up).  This prevents that.
         */
        WalSndCtl->sync_standbys_defined = sync_standbys_defined;

        LWLockRelease(SyncRepLock);
    }
}

#ifdef USE_ASSERT_CHECKING
static bool
SyncRepQueueIsOrderedByLSN(int mode)
{
    PGPROC       *proc = NULL;
    XLogRecPtr    lastLSN;

    Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);

    lastLSN = 0;

    proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
                                   &(WalSndCtl->SyncRepQueue[mode]),
                                   offsetof(PGPROC, syncRepLinks));

    while (proc)
    {
        /*
         * Check the queue is ordered by LSN and that multiple procs don't
         * have matching LSNs
         */
        if (proc->waitLSN <= lastLSN)
            return false;

        lastLSN = proc->waitLSN;

        proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
                                       &(proc->syncRepLinks),
                                       offsetof(PGPROC, syncRepLinks));
    }

    return true;
}
#endif

/*
 * ===========================================================
 * Synchronous Replication functions executed by any process
 * ===========================================================
 */

bool
check_synchronous_standby_names(char **newval, void **extra, GucSource source)
{// #lizard forgives
    if (*newval != NULL && (*newval)[0] != '\0')
    {
        int            parse_rc;
        SyncRepConfigData *pconf;

        /* Reset communication variables to ensure a fresh start */
        syncrep_parse_result = NULL;
        syncrep_parse_error_msg = NULL;

        /* Parse the synchronous_standby_names string */
        syncrep_scanner_init(*newval);
        parse_rc = syncrep_yyparse();
        syncrep_scanner_finish();

        if (parse_rc != 0 || syncrep_parse_result == NULL)
        {
            GUC_check_errcode(ERRCODE_SYNTAX_ERROR);
            if (syncrep_parse_error_msg)
                GUC_check_errdetail("%s", syncrep_parse_error_msg);
            else
                GUC_check_errdetail("synchronous_standby_names parser failed");
            return false;
        }

        if (syncrep_parse_result->num_sync <= 0)
        {
            GUC_check_errmsg("number of synchronous standbys (%d) must be greater than zero",
                             syncrep_parse_result->num_sync);
            return false;
        }

        /* GUC extra value must be malloc'd, not palloc'd */
        pconf = (SyncRepConfigData *)
            malloc(syncrep_parse_result->config_size);
        if (pconf == NULL)
            return false;
        memcpy(pconf, syncrep_parse_result, syncrep_parse_result->config_size);

        *extra = (void *) pconf;

        /*
         * We need not explicitly clean up syncrep_parse_result.  It, and any
         * other cruft generated during parsing, will be freed when the
         * current memory context is deleted.  (This code is generally run in
         * a short-lived context used for config file processing, so that will
         * not be very long.)
         */
    }
    else
        *extra = NULL;

    return true;
}

void
assign_synchronous_standby_names(const char *newval, void *extra)
{
    SyncRepConfig = (SyncRepConfigData *) extra;
}

void
assign_synchronous_commit(int newval, void *extra)
{
    switch (newval)
    {
        case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
            SyncRepWaitMode = SYNC_REP_WAIT_WRITE;
            break;
        case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
            SyncRepWaitMode = SYNC_REP_WAIT_FLUSH;
            break;
        case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
            SyncRepWaitMode = SYNC_REP_WAIT_APPLY;
            break;
        default:
            SyncRepWaitMode = SYNC_REP_NO_WAIT;
            break;
    }
}
